package com.surfilter.massdata.spark.task;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

import scala.Tuple2;

import com.act.sparkanalyz.service.impl.SparkService.OutQueueEntity;
import com.act.sparkanalyz.task.ISparkTask;
import com.surfilter.massdata.spark.bean.SAN2100;
import com.surfilter.massdata.spark.util.CommonUtils;
import com.surfilter.massdata.spark.util.DateUtil;
import com.surfilter.massdata.spark.util.URLUtil;

/**
 * 网站访问变化图
 * 本省网民访问重点网站访问量变化统计 (本省网民是指CIP在BR2002中存在，省内网站是指DIP在BR2002中存在)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                     
 */
public class WebSiteAccessTask implements ISparkTask{
	
	private static final Log log = LogFactory.getLog(WebSiteAccessTask.class);
	private static final long serialVersionUID = 1L;
	private String outName;
	
	@Override
	public List<OutQueueEntity> execute(Map<String, DataFrame> dataFrames,Map<String, String> commandMap) {
		
		List<OutQueueEntity> outList = new ArrayList<OutQueueEntity>();
		try{
			
			DataFrame cipDipFrame = dataFrames.get("cip_dip_frame");
			JavaRDD<Row> cipDipRDD = cipDipFrame.toJavaRDD();
			
			SQLContext sqlContext = cipDipFrame.sqlContext();
			
	        String dayStr = commandMap.get("-d");
	        Date date = DateUtil.getExecDate(DateUtil.getPreviousDayHour(new Date()),dayStr);
	        JavaRDD<Map<String, Object>> rdd = getRdd(cipDipRDD);
	        
	        JavaRDD<SAN2100> javaRDD = createRdd(date, rdd);
	    	DataFrame df = sqlContext.createDataFrame(javaRDD, SAN2100.class);

	    	outList.add(new OutQueueEntity(outName, df));
	    	
	    	CommonUtils.deleteTaskTable("SAN2100", date, "WEBSITE_FOCUS_SUM", 0, "hour");
		}catch(Exception e){
			if(log.isErrorEnabled()){
				log.error(e.getMessage(),e);
			}
		}
		return outList;
	}

	@SuppressWarnings("serial")
	private JavaRDD<Map<String, Object>> getRdd(JavaRDD<Row> sourceRdd) {
		
		JavaRDD<Map<String,Object>> rdd = sourceRdd.mapToPair(new PairFunction<Row, String,Long>() {

			@Override
			public Tuple2<String, Long> call(Row row) throws Exception {
				try{
					String domain = row.getAs("domain");
					String topDomain = URLUtil.getDomainName("http://" + domain);
					if(StringUtils.isNotBlank(topDomain)){
						long count_value = Long.parseLong(row.getAs("dns_visit_count").toString());
						
						return new Tuple2<String, Long>(topDomain,count_value);
					}
				}catch(Exception e){
					if(log.isErrorEnabled()){
						log.error(e.getMessage(),e);
					}
				}
				return new Tuple2<String, Long>("",0L);
			}
		}).reduceByKey(new Function2<Long, Long, Long>() {
			
			@Override
			public Long call(Long v1, Long v2){
				return v1 + v2;
			}
		}).map(new Function<Tuple2<String,Long>, Map<String,Object>>() {

			@Override
			public Map<String, Object> call(Tuple2<String, Long> tuple){
				Map<String,Object> map = new HashMap<String,Object>();
				try{
					String domain = tuple._1;
					if(StringUtils.isNotBlank(domain)){
						map.put("domain",domain);
						map.put("count_value",tuple._2);
					}
				}catch(Exception e){
					if(log.isErrorEnabled()){
						log.error(e.getMessage(),e);
					}
				}
				return map;
			}
		});
		
		return rdd;
	}

	@SuppressWarnings("serial")
	private JavaRDD<SAN2100> createRdd(final Date date,JavaRDD<Map<String, Object>> rdd) {
		
		JavaRDD<SAN2100> javaRDD  = rdd.mapPartitions(new FlatMapFunction<Iterator<Map<String,Object>>,SAN2100>() {

			@Override
			public Iterable<SAN2100> call(Iterator<Map<String, Object>> it)throws Exception {
				List<SAN2100> list = new ArrayList<SAN2100>();
				while(it.hasNext()){
					Map<String,Object> map = it.next();
					String domain = CommonUtils.valueOf(map.get("domain"));
					if(StringUtils.isNotBlank(domain)){
	        			SAN2100 san = new SAN2100();
	        			san.setYear(DateUtil.getCurrentYear(date));
	        			san.setHalf_year(DateUtil.getHalfYear(date));
	        			san.setQuarter(DateUtil.getQuarter(date));
	        			san.setMonth(DateUtil.getCurrentMonth(date));
	        			san.setWeek(DateUtil.getCurrentWeek(date));
	        			san.setDay(DateUtil.getCurrentDay(date));
	        			san.setHour(DateUtil.getCurrentHour(date));
	        			san.setSsfl(0);
	            		san.setBuss_type("WEBSITE_FOCUS_SUM");
	        			san.setTop_domain(domain);
	        			san.setCount_value(Long.parseLong(CommonUtils.valueOf(map.get("count_value"))));
	            		list.add(san);
	        		}
				}
				return list;
			}
		});
        
		/*List<SAN2100> list = new ArrayList<SAN2100>();
        List<Map<String,Object>> resultList = rdd.collect();
        if(resultList != null && resultList.size() > 0){
        	Map<String,Long> mapSeq = CommonUtils.getSeq("SAN2100", resultList.size());
        	long id = mapSeq.get("minSeq");
        	for(Map<String,Object> map : resultList){
        		String domain = CommonUtils.valueOf(map.get("domain"));
        		if(StringUtils.isNotBlank(domain)){
        			SAN2100 sa = new SAN2100();
            		sa.setId(id++);
            		sa.setYear(DateUtil.getCurrentYear(date));
            		sa.setHalf_year(DateUtil.getHalfYear(date));
            		sa.setQuarter(DateUtil.getQuarter(date));
            		sa.setMonth(DateUtil.getCurrentMonth(date));
            		sa.setWeek(DateUtil.getCurrentWeek(date));
            		sa.setDay(DateUtil.getCurrentDay(date));
            		sa.setHour(DateUtil.getCurrentHour(date));
            		sa.setSsfl(0);
            		sa.setBuss_type("WEBSITE_FOCUS_SUM");
            		sa.setTop_domain(domain);
            		sa.setCount_value(Long.parseLong(CommonUtils.valueOf(map.get("count_value"))));
            		list.add(sa);
        		}
        	}
        }
    	JavaSparkContext ctx = new JavaSparkContext(dataSource.sqlContext().sparkContext());
    	JavaRDD<SAN2100> javaRDD = ctx.parallelize(list);*/
		return javaRDD;
	}

}
